/*
 * Unidata Platform
 * Copyright (c) 2013-2020, UNIDATA LLC, All rights reserved.
 *
 * Commercial License
 * This version of Unidata Platform is licensed commercially and is the appropriate option for the vast majority of use cases.
 *
 * Please see the Unidata Licensing page at: https://unidata-platform.com/license/
 * For clarification or additional options, please contact: info@unidata-platform.com
 * -------
 * Disclaimer:
 * -------
 * THIS SOFTWARE IS DISTRIBUTED "AS-IS" WITHOUT ANY WARRANTIES, CONDITIONS AND
 * REPRESENTATIONS WHETHER EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION THE
 * IMPLIED WARRANTIES AND CONDITIONS OF MERCHANTABILITY, MERCHANTABLE QUALITY,
 * FITNESS FOR A PARTICULAR PURPOSE, DURABILITY, NON-INFRINGEMENT, PERFORMANCE AND
 * THOSE ARISING BY STATUTE OR FROM CUSTOM OR USAGE OF TRADE OR COURSE OF DEALING.
 */
package org.unidata.mdm.dq.data.service.impl.function.data;

import static java.sql.ResultSet.CONCUR_READ_ONLY;
import static java.sql.ResultSet.TYPE_FORWARD_ONLY;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Objects;

import org.apache.commons.lang3.exception.ExceptionUtils;
import org.unidata.mdm.core.type.data.SimpleAttribute;
import org.unidata.mdm.core.type.data.SimpleAttribute.SimpleDataType;
import org.unidata.mdm.core.type.data.impl.AbstractSimpleAttribute;
import org.unidata.mdm.core.util.AttributeUtils;
import org.unidata.mdm.dq.core.context.CleanseFunctionContext;
import org.unidata.mdm.dq.core.dto.CleanseFunctionResult;
import org.unidata.mdm.dq.core.service.impl.function.system.AbstractSystemCleanseFunction;
import org.unidata.mdm.dq.core.type.cleanse.CleanseFunctionConfiguration;
import org.unidata.mdm.dq.core.type.cleanse.CleanseFunctionExecutionScope;
import org.unidata.mdm.dq.core.type.cleanse.CleanseFunctionInputParam;
import org.unidata.mdm.dq.core.type.cleanse.CleanseFunctionOutputParam;
import org.unidata.mdm.dq.core.type.cleanse.CleanseFunctionPortFilteringMode;
import org.unidata.mdm.dq.core.type.cleanse.CleanseFunctionPortInputType;
import org.unidata.mdm.dq.core.type.cleanse.CleanseFunctionPortValueType;
import org.unidata.mdm.dq.core.type.constant.CleanseConstants;
import org.unidata.mdm.dq.core.type.io.DataQualityError;
import org.unidata.mdm.dq.core.type.rule.SeverityIndicator;
import org.unidata.mdm.dq.core.util.DQUtils;
import org.unidata.mdm.system.util.TextUtils;

/**
 * Fetch data from external
 */
public class OuterFetch extends AbstractSystemCleanseFunction {
    /**
     * Display name code.
     */
    private static final String FUNCTION_DISPLAY_NAME = "app.dq.functions.data.outer.fetch.display.name";
    /**
     * Description code.
     */
    private static final String FUNCTION_DESCRIPTION = "app.dq.functions.data.avoid.duplicates.decsription";
    /**
     * IP1 name code.
     */
    private static final String INPUT_PORT_1_NAME = "app.dq.functions.data.outer.fetch.input.port1.name";
    /**
     * IP1 description code.
     */
    private static final String INPUT_PORT_1_DESCRIPTION = "app.dq.functions.data.outer.fetch.input.port1.decsription";
    /**
     * IP2 name code.
     */
    private static final String INPUT_PORT_2_NAME = "app.dq.functions.data.outer.fetch.input.port2.name";
    /**
     * IP2 description code.
     */
    private static final String INPUT_PORT_2_DESCRIPTION = "app.dq.functions.data.outer.fetch.input.port2.decsription";
    /**
     * IP3 name code.
     */
    private static final String INPUT_PORT_3_NAME = "app.dq.functions.data.outer.fetch.input.port3.name";
    /**
     * IP3 description code.
     */
    private static final String INPUT_PORT_3_DESCRIPTION = "app.dq.functions.data.outer.fetch.input.port3.decsription";
    /**
     * IP4 name code.
     */
    private static final String INPUT_PORT_4_NAME = "app.dq.functions.data.outer.fetch.input.port4.name";
    /**
     * IP4 description code.
     */
    private static final String INPUT_PORT_4_DESCRIPTION = "app.dq.functions.data.outer.fetch.input.port4.decsription";
    /**
     * OP1 name code.
     */
    private static final String OUTPUT_PORT_1_NAME = "app.dq.functions.data.outer.fetch.output.port1.name";
    /**
     * OP1 description code.
     */
    private static final String OUTPUT_PORT_1_DESCRIPTION = "app.dq.functions.data.outer.fetch.output.port1.decsription";
    /**
     * OP2 name code.
     */
    private static final String OUTPUT_PORT_2_NAME = "app.dq.functions.data.outer.fetch.output.port2.name";
    /**
     * OP2 description code.
     */
    private static final String OUTPUT_PORT_2_DESCRIPTION = "app.dq.functions.data.outer.fetch.output.port2.decsription";
    /**
     * OP3 name code.
     */
    private static final String OUTPUT_PORT_3_NAME = "app.dq.functions.data.outer.fetch.output.port3.name";
    /**
     * OP3 description code.
     */
    private static final String OUTPUT_PORT_3_DESCRIPTION = "app.dq.functions.data.outer.fetch.output.port3.decsription";
    /**
     * This function configuration.
     */
    private static final CleanseFunctionConfiguration CONFIGURATION
        = CleanseFunctionConfiguration.configuration()
            .supports(CleanseFunctionExecutionScope.GLOBAL, CleanseFunctionExecutionScope.LOCAL)
            .input(CleanseFunctionConfiguration.port()
                    .name(CleanseConstants.INPUT_PORT_1)
                    .displayName(() -> TextUtils.getText(INPUT_PORT_1_NAME))
                    .description(() -> TextUtils.getText(INPUT_PORT_1_DESCRIPTION))
                    .filteringMode(CleanseFunctionPortFilteringMode.MODE_ALL)
                    .inputTypes(CleanseFunctionPortInputType.SIMPLE)
                    .valueTypes(CleanseFunctionPortValueType.STRING)
                    .required(true)
                    .build())
            .input(CleanseFunctionConfiguration.port()
                    .name(CleanseConstants.INPUT_PORT_2)
                    .displayName(() -> TextUtils.getText(INPUT_PORT_2_NAME))
                    .description(() -> TextUtils.getText(INPUT_PORT_2_DESCRIPTION))
                    .filteringMode(CleanseFunctionPortFilteringMode.MODE_ALL)
                    .inputTypes(CleanseFunctionPortInputType.SIMPLE)
                    .valueTypes(CleanseFunctionPortValueType.STRING)
                    .required(true)
                    .build())
            .input(CleanseFunctionConfiguration.port()
                    .name(CleanseConstants.INPUT_PORT_3)
                    .displayName(() -> TextUtils.getText(INPUT_PORT_3_NAME))
                    .description(() -> TextUtils.getText(INPUT_PORT_3_DESCRIPTION))
                    .filteringMode(CleanseFunctionPortFilteringMode.MODE_ALL)
                    .inputTypes(CleanseFunctionPortInputType.SIMPLE)
                    .valueTypes(CleanseFunctionPortValueType.INTEGER)
                    .required(true)
                    .build())
            .input(CleanseFunctionConfiguration.port()
                    .name(CleanseConstants.INPUT_PORT_4)
                    .displayName(() -> TextUtils.getText(INPUT_PORT_4_NAME))
                    .description(() -> TextUtils.getText(INPUT_PORT_4_DESCRIPTION))
                    .filteringMode(CleanseFunctionPortFilteringMode.MODE_ALL)
                    .inputTypes(CleanseFunctionPortInputType.SIMPLE)
                    .valueTypes(CleanseFunctionPortValueType.STRING)
                    .required(true)
                    .build())
            .output(CleanseFunctionConfiguration.port()
                    .name(CleanseConstants.OUTPUT_PORT_1)
                    .displayName(() -> TextUtils.getText(OUTPUT_PORT_1_NAME))
                    .description(() -> TextUtils.getText(OUTPUT_PORT_1_DESCRIPTION))
                    .filteringMode(CleanseFunctionPortFilteringMode.MODE_ALL)
                    .inputTypes(CleanseFunctionPortInputType.SIMPLE)
                    .valueTypes(CleanseFunctionPortValueType.ANY)
                    .required(false)
                    .build())
            .output(CleanseFunctionConfiguration.port()
                    .name(CleanseConstants.OUTPUT_PORT_2)
                    .displayName(() -> TextUtils.getText(OUTPUT_PORT_2_NAME))
                    .description(() -> TextUtils.getText(OUTPUT_PORT_2_DESCRIPTION))
                    .filteringMode(CleanseFunctionPortFilteringMode.MODE_ALL)
                    .inputTypes(CleanseFunctionPortInputType.SIMPLE)
                    .valueTypes(CleanseFunctionPortValueType.BOOLEAN)
                    .required(false)
                    .build())
            .output(CleanseFunctionConfiguration.port()
                    .name(CleanseConstants.OUTPUT_PORT_3)
                    .displayName(() -> TextUtils.getText(OUTPUT_PORT_3_NAME))
                    .description(() -> TextUtils.getText(OUTPUT_PORT_3_DESCRIPTION))
                    .filteringMode(CleanseFunctionPortFilteringMode.MODE_ALL)
                    .inputTypes(CleanseFunctionPortInputType.SIMPLE)
                    .valueTypes(CleanseFunctionPortValueType.INTEGER)
                    .required(false)
                    .build())
            .build();
    /**
     * Index of the first column
     */
    private static final int FIRST_COLUMN = 1;
    /**
     * Single result count
     */
    private static final long SINGLE_RESULT_COUNT = 1;
    /**
     * Instantiates a new cleanse function abstract.
     */
    public OuterFetch() {
        super("OuterFetch", () -> TextUtils.getText(FUNCTION_DISPLAY_NAME), () -> TextUtils.getText(FUNCTION_DESCRIPTION));
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public CleanseFunctionConfiguration configure() {
        return CONFIGURATION;
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public CleanseFunctionResult execute(CleanseFunctionContext ctx) {

        CleanseFunctionResult result = new CleanseFunctionResult();

        CleanseFunctionInputParam jdbcUrl = ctx.getInputParam(CleanseConstants.INPUT_PORT_1); // String
        CleanseFunctionInputParam query = ctx.getInputParam(CleanseConstants.INPUT_PORT_2); // String
        CleanseFunctionInputParam fetchType = ctx.getInputParam(CleanseConstants.INPUT_PORT_3); // Long
        CleanseFunctionInputParam outputType = ctx.getInputParam(CleanseConstants. INPUT_PORT_4); // String

        if ((Objects.isNull(jdbcUrl) || jdbcUrl.isEmpty())
         || (Objects.isNull(query) || query.isEmpty())
         || (Objects.isNull(fetchType) || fetchType.isEmpty())
         || (Objects.isNull(outputType) || outputType.isEmpty())) {

            result.addError(DataQualityError.builder()
                    .ruleName(ctx.getRuleName())
                    .functionName(getName())
                    .category(DQUtils.CATEGORY_SYSTEM)
                    .message(TextUtils.getText("app.dq.functions.data.outer.fetch.missing.parameters"))
                    .severity(SeverityIndicator.RED)
                    .build());

            return result;
        }

        Object fetchedObject = null;
        long count = 0;
        try (Connection connection = DriverManager.getConnection(jdbcUrl.toSingletonValue());
             Statement statement = connection.createStatement(TYPE_FORWARD_ONLY, CONCUR_READ_ONLY);
             ResultSet resultSet = statement.executeQuery(query.toSingletonValue())) {

            Object lastResult = null;
            Object firstResult = null;
            while (resultSet != null && resultSet.next()) {
                count++;
                lastResult = resultSet.getObject(FIRST_COLUMN);
                firstResult = count == SINGLE_RESULT_COUNT ? lastResult : firstResult;
            }

            fetchedObject = fetchType.<Long>toSingletonValue() == 0 ? firstResult : lastResult;
        } catch (SQLException e) {

            result.addError(DataQualityError.builder()
                    .ruleName(ctx.getRuleName())
                    .functionName(getName())
                    .category(DQUtils.CATEGORY_SYSTEM)
                    .message(TextUtils.getText("app.dq.functions.data.outer.fetch.sql.exception", ExceptionUtils.getMessage(e)))
                    .severity(SeverityIndicator.RED)
                    .build());

            return result;
        }

        SimpleAttribute<?> retval
            = AbstractSimpleAttribute.of(
                    SimpleDataType.valueOf(outputType.toSingletonValue()),
                    CleanseConstants.OUTPUT_PORT_1);

        AttributeUtils.processSimpleAttributeValue(retval, fetchedObject);

        result.putOutputParam(CleanseFunctionOutputParam.of(CleanseConstants.OUTPUT_PORT_1, retval));
        result.putOutputParam(CleanseFunctionOutputParam.of(CleanseConstants.OUTPUT_PORT_2, count == SINGLE_RESULT_COUNT));
        result.putOutputParam(CleanseFunctionOutputParam.of(CleanseConstants.OUTPUT_PORT_3, count));

        return result;
    }
}
